package org.databandtech.mysql2hive;

import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.databandtech.mysql2hive.sink.HiveSink;
import org.databandtech.mysql2hive.source.SourceFromMySQLByTuple5;

public class MysqlToHive {

	// 数据库相关
	final static String URL = "jdbc:mysql://127.0.0.1:3307/databand?useUnicode=true&characterEncoding=utf-8&useSSL=false";
	final static String USER = "root";
	final static String PASS = "mysql";
	final static String SQL = "SELECT title,status,vid,cover_id,url from databand_video ORDER BY id DESC LIMIT 100";
	final static String[] COLUMNS_READ = new String[] { "title", "status", "vid", "cover_id", "url" };

	// Hive相关
	/**
	 * 执行sql：
		CREATE  TABLE IF NOT EXISTS h_video(
		  title string,
		  status int,
		  vid string,
		  cover_id string,
		  url string
		)
		ROW FORMAT DELIMITED
		FIELDS TERMINATED BY ',';
	 * @param args
	 */
	final static String SQLSINK = "insert into h_video(title,status,vid,cover_id,url) values(? ,?, ?, ?, ?)\";";
	
	public static void main(String[] args) {

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// ######### 数据读取 #########
		DataStreamSource<Tuple5<String, Integer, String, String, String>> sourceStream = env
				.addSource(new SourceFromMySQLByTuple5(URL, USER, PASS, COLUMNS_READ, SQL));

		sourceStream.print();
		
		HiveSink sink = new HiveSink(SQLSINK);
		
		// ######### 数据写入 #########
		sourceStream.addSink(sink);

		try {
			env.execute("ok-hdfs");
		} catch (Exception e) {
			e.printStackTrace();
		}

	
	}

}
